package com.zhu.utils;

/**
 * FlinK sql MySql工具类
 */
public class MySqlUtil {

    /**
     * 将Mysql中的字典表读取为Flink lookup 表 进行维度退化
     * @return
     */
    public static String getBaseDicLooKupDDL() {
        return "create table `base_dic` (" +
                "`dic_code` string, " +
                "`dic_name` string, " +
                "`parent_code` string, " +
                "`create_time` string, " +
                "`operate_time` string, " +
                "primary key(`dic_code`) not enforced " +
                ")" + MySqlUtil.mysqlLookUpTableDDL("base_dic");
    }

    private static String mysqlLookUpTableDDL(String tableName) {

        return "WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'url' = 'jdbc:mysql://hadoop102:3306/flink?useSSL=false',\n" +
                "'table-name' = '" + tableName + "',\n" +
                "'lookup.cache.max-rows' = '10',\n" +  //缓存中记录的最大条数
                "'lookup.cache.ttl' = '1 hour',\n" +  //缓存中记录的最长时间
                "'username' = 'root',\n" +
                "'password' = '000000',\n" +
                "'driver' = 'com.mysql.jdbc.Driver'\n" +
                ")";
    }
}
